原文:Topics
状态:待校对
翻译:Bingjian-Zhu
校对:

CC-BY-SA

为什么需要topic交换机?

(使用Go客户端)

上一篇教程,我们改进了我们的日志系统。我们使用direct交换机替代了fanout交换机,从只能盲目的广播消息改进为有可能选择性的接收日志。

尽管direct交换机能够改善我们的系统,但是它也有它的限制 —— 没办法基于多个标准执行路由操作。

在我们的日志系统中,我们不只希望订阅基于严重程度的日志,同时还希望订阅基于发送来源的日志。Unix工具syslog就是同时基于严重程度-severity (info/warn/crit…) 和 设备-facility (auth/cron/kern…)来路由日志的。

如果这样的话,将会给予我们非常大的灵活性,我们既可以监听来源于“cron”的严重程度为“critical errors”的日志,也可以监听来源于“kern”的所有日志。

为了实现这个目的,接下来我们学习如何使用另一种更复杂的交换机 —— topic交换机。

topic交换机

发送到topic交换机的消息不可以携带随意routing_key,它的routing_key必须是一个由.分隔开的词语列表。这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇。以下是几个推荐的例子:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。词语的个数可以随意,但是不要超过255字节。

binding key也必须拥有同样的格式。topic交换机背后的逻辑跟direct交换机很相似 —— 一个携带着特定routing_key的消息会被topic交换机投递给绑定键与之想匹配的队列。但是它的binding key和routing_key有两个特殊应用方式:

  • * (星号) 用来表示一个单词.
  • # (井号) 用来表示任意数量(零个或多个)单词。

下边用图说明:
None

这个例子里,我们发送的所有消息都是用来描述小动物的。发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个.分割开。路由键里的第一个单词描述的是动物的手脚的利索程度,第二个单词是动物的颜色,第三个是动物的种类。所以它看起来是这样的: <celerity>.<colour>.<species>

我们创建了三个绑定:Q1的绑定键为 *.orange.*,Q2的绑定键为 *.*.rabbitlazy.#

这三个绑定键被可以总结为:

  • Q1 对所有的桔黄色动物都感兴趣。
  • Q2 则是对所有的兔子所有懒惰的动物感兴趣。

一个携带有 quick.orange.rabbit 的消息将会被分别投递给这两个队列。携带着 lazy.orange.elephant 的消息同样也会给两个队列都投递过去。另一方面携带有 quick.orange.fox 的消息会投递给第一个队列,携带有 lazy.brown.fox 的消息会投递给第二个队列。携带有 lazy.pink.rabbit 的消息只会被投递给第二个队列一次,即使它同时匹配第二个队列的两个绑定。携带着 quick.brown.fox 的消息不会投递给任何一个队列。

如果我们违反约定,发送了一个携带有一个单词或者四个单词("orange" or "quick.orange.male.rabbit")的消息时,发送的消息不会投递给任何一个队列,而且会丢失掉。

但是另一方面,即使 "lazy.orange.male.rabbit" 有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。

Topic交换机

Topic交换机是很强大的,它可以表现出跟其他交换机类似的行为 当一个队列的binding key为 “#”(井号) 的时候,这个队列将会无视消息的routing key,接收所有的消息。 当 * (星号) 和 # (井号) 这两个特殊字符都未在binding key中出现的时候,此时Topic交换机就拥有的direct交换机的行为。

代码整合

接下来我们会将Topic交换机应用到我们的日志系统中。在开始工作前,我们假设日志的routing key由两个单词组成,routing key看起来是这样的:<facility>.<severity>

代码跟上一篇教程差不多。

emit_log_topic.go的代码:

  1. package main
  2. import (
  3. "log"
  4. "os"
  5. "strings"
  6. "github.com/streadway/amqp"
  7. )
  8. func failOnError(err error, msg string) {
  9. if err != nil {
  10. log.Fatalf("%s: %s", msg, err)
  11. }
  12. }
  13. func main() {
  14. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  15. failOnError(err, "Failed to connect to RabbitMQ")
  16. defer conn.Close()
  17. ch, err := conn.Channel()
  18. failOnError(err, "Failed to open a channel")
  19. defer ch.Close()
  20. err = ch.ExchangeDeclare(
  21. "logs_topic", // name
  22. "topic", // type
  23. true, // durable
  24. false, // auto-deleted
  25. false, // internal
  26. false, // no-wait
  27. nil, // arguments
  28. )
  29. failOnError(err, "Failed to declare an exchange")
  30. body := bodyFrom(os.Args)
  31. err = ch.Publish(
  32. "logs_topic", // exchange
  33. severityFrom(os.Args), // routing key
  34. false, // mandatory
  35. false, // immediate
  36. amqp.Publishing{
  37. ContentType: "text/plain",
  38. Body: []byte(body),
  39. })
  40. failOnError(err, "Failed to publish a message")
  41. log.Printf(" [x] Sent %s", body)
  42. }
  43. func bodyFrom(args []string) string {
  44. var s string
  45. if (len(args) < 3) || os.Args[2] == "" {
  46. s = "hello"
  47. } else {
  48. s = strings.Join(args[2:], " ")
  49. }
  50. return s
  51. }
  52. func severityFrom(args []string) string {
  53. var s string
  54. if (len(args) < 2) || os.Args[1] == "" {
  55. s = "anonymous.info"
  56. } else {
  57. s = os.Args[1]
  58. }
  59. return s
  60. }

receive_logs_topic.go的代码:

  1. package main
  2. import (
  3. "log"
  4. "os"
  5. "github.com/streadway/amqp"
  6. )
  7. func failOnError(err error, msg string) {
  8. if err != nil {
  9. log.Fatalf("%s: %s", msg, err)
  10. }
  11. }
  12. func main() {
  13. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  14. failOnError(err, "Failed to connect to RabbitMQ")
  15. defer conn.Close()
  16. ch, err := conn.Channel()
  17. failOnError(err, "Failed to open a channel")
  18. defer ch.Close()
  19. err = ch.ExchangeDeclare(
  20. "logs_topic", // name
  21. "topic", // type
  22. true, // durable
  23. false, // auto-deleted
  24. false, // internal
  25. false, // no-wait
  26. nil, // arguments
  27. )
  28. failOnError(err, "Failed to declare an exchange")
  29. q, err := ch.QueueDeclare(
  30. "", // name
  31. false, // durable
  32. false, // delete when usused
  33. true, // exclusive
  34. false, // no-wait
  35. nil, // arguments
  36. )
  37. failOnError(err, "Failed to declare a queue")
  38. if len(os.Args) < 2 {
  39. log.Printf("Usage: %s [binding_key]...", os.Args[0])
  40. os.Exit(0)
  41. }
  42. for _, s := range os.Args[1:] {
  43. log.Printf("Binding queue %s to exchange %s with routing key %s",
  44. q.Name, "logs_topic", s)
  45. err = ch.QueueBind(
  46. q.Name, // queue name
  47. s, // routing key
  48. "logs_topic", // exchange
  49. false,
  50. nil)
  51. failOnError(err, "Failed to bind a queue")
  52. }
  53. msgs, err := ch.Consume(
  54. q.Name, // queue
  55. "", // consumer
  56. true, // auto ack
  57. false, // exclusive
  58. false, // no local
  59. false, // no wait
  60. nil, // args
  61. )
  62. failOnError(err, "Failed to register a consumer")
  63. forever := make(chan bool)
  64. go func() {
  65. for d := range msgs {
  66. log.Printf(" [x] %s", d.Body)
  67. }
  68. }()
  69. log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
  70. <-forever
  71. }

执行下边命令 接收所有日志:

  1. go run receive_logs_topic.go "#"

执行下边命令 接收来自”kern“设备的日志:

  1. go run receive_logs_topic.go "kern.*"

执行下边命令 只接收严重程度为”critical“的日志:

  1. go run receive_logs_topic.go "*.critical"

执行下边命令 建立多个绑定:

  1. go run receive_logs_topic.go "kern.*" "*.critical"

执行下边命令 发送路由键为 “kern.critical” 的日志:

  1. go run emit_log_topic.go "kern.critical" "A critical kernel error"

执行上边命令试试看效果吧。另外,上边代码不会对路由键和绑定键做任何假设,所以你可以在命令中使用超过两个路由键参数。

如果你现在还没被搞晕,想想下边问题:

  • 绑定键为 * 的队列会取到一个routing key为空的消息吗?
  • 绑定键为 #.* 的队列会获取到一个名为..的路由键的消息吗?它会取到一个routing key为单个单词的消息吗?
  • a.*.#a.#的区别在哪儿?

(完整代码参见emit_logs_topic.go and receive_logs_topic.go)